/*
 * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
 *
 * Licensed under the Apache License, Version 2.0 (the "License").
 * You may not use this file except in compliance with the License.
 * A copy of the License is located at
 *
 *  http://aws.amazon.com/apache2.0
 *
 * or in the "license" file accompanying this file. This file is distributed
 * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
 * express or implied. See the License for the specific language governing
 * permissions and limitations under the License.
 */

package software.amazon.awssdk.enhanced.dynamodb.functionaltests;

import static java.util.stream.Collectors.toList;
import static software.amazon.awssdk.enhanced.dynamodb.extensions.AutoGeneratedUuidExtension.AttributeTags.autoGeneratedUuidAttribute;
import static software.amazon.awssdk.enhanced.dynamodb.internal.AttributeValues.stringValue;
import static software.amazon.awssdk.enhanced.dynamodb.mapper.StaticAttributeTags.primaryPartitionKey;
import static software.amazon.awssdk.enhanced.dynamodb.mapper.StaticAttributeTags.updateBehavior;

import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.UUID;
import java.util.regex.Pattern;
import java.util.stream.IntStream;
import org.assertj.core.api.Assertions;
import org.junit.After;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.junit.runners.Parameterized.Parameters;
import software.amazon.awssdk.enhanced.dynamodb.DynamoDbEnhancedClient;
import software.amazon.awssdk.enhanced.dynamodb.DynamoDbTable;
import software.amazon.awssdk.enhanced.dynamodb.Expression;
import software.amazon.awssdk.enhanced.dynamodb.OperationContext;
import software.amazon.awssdk.enhanced.dynamodb.TableMetadata;
import software.amazon.awssdk.enhanced.dynamodb.TableSchema;
import software.amazon.awssdk.enhanced.dynamodb.extensions.AutoGeneratedUuidExtension;
import software.amazon.awssdk.enhanced.dynamodb.extensions.annotations.DynamoDbAutoGeneratedUuid;
import software.amazon.awssdk.enhanced.dynamodb.internal.operations.DefaultOperationContext;
import software.amazon.awssdk.enhanced.dynamodb.mapper.StaticTableSchema;
import software.amazon.awssdk.enhanced.dynamodb.mapper.UpdateBehavior;
import software.amazon.awssdk.enhanced.dynamodb.mapper.annotations.DynamoDbBean;
import software.amazon.awssdk.enhanced.dynamodb.mapper.annotations.DynamoDbFlatten;
import software.amazon.awssdk.enhanced.dynamodb.mapper.annotations.DynamoDbPartitionKey;
import software.amazon.awssdk.enhanced.dynamodb.mapper.annotations.DynamoDbUpdateBehavior;
import software.amazon.awssdk.enhanced.dynamodb.model.PutItemEnhancedRequest;
import software.amazon.awssdk.services.dynamodb.DynamoDbClient;
import software.amazon.awssdk.services.dynamodb.model.AttributeValue;
import software.amazon.awssdk.services.dynamodb.model.ConditionalCheckFailedException;
import software.amazon.awssdk.services.dynamodb.model.DeleteTableRequest;
import software.amazon.awssdk.services.dynamodb.model.GetItemRequest;
import software.amazon.awssdk.services.dynamodb.model.GetItemResponse;
import software.amazon.awssdk.services.dynamodb.model.ProvisionedThroughput;

@RunWith(Parameterized.class)
public class AutoGeneratedUuidRecordTest extends LocalDynamoDbSyncTestBase{

    private static final String UUID_REGEX =
        "^[0-9a-fA-F]{8}-[0-9a-fA-F]{4}-[0-9a-fA-F]{4}-[0-9a-fA-F]{4}-[0-9a-fA-F]{12}$";

    private static final Pattern UUID_PATTERN = Pattern.compile(UUID_REGEX);

    public static void assertValidUuid(String uuid) {
        Assertions.assertThat(UUID_PATTERN.matcher(uuid).matches()).isTrue();
    }

    private static final String TABLE_NAME = "table-name";
    private static final OperationContext PRIMARY_CONTEXT =
        DefaultOperationContext.create(TABLE_NAME, TableMetadata.primaryIndexName());


    public AutoGeneratedUuidRecordTest(String testName, TableSchema<Record> recordTableSchema) {
        this.mappedTable = DynamoDbEnhancedClient.builder()
                                                 .dynamoDbClient(getDynamoDbClient())
                                                 .extensions(AutoGeneratedUuidExtension.create())
                                                 .build().table(getConcreteTableName("table-name"),
                                                                recordTableSchema);
        this.testCaseName = testName;
    }

    private static final TableSchema<FlattenedRecord> FLATTENED_TABLE_SCHEMA =
        StaticTableSchema.builder(FlattenedRecord.class)
                         .newItemSupplier(FlattenedRecord::new)
                         .addAttribute(String.class, a -> a.name("generated")
                                                           .getter(FlattenedRecord::getGenerated)
                                                           .setter(FlattenedRecord::generated)
                                                           .tags(autoGeneratedUuidAttribute()))
                         .build();

    private static final TableSchema<Record> TABLE_SCHEMA =
        StaticTableSchema.builder(Record.class)
                         .newItemSupplier(Record::new)
                         .addAttribute(String.class, a -> a.name("id")
                                                           .getter(Record::getId)
                                                           .setter(Record::id)
                                                           .tags(primaryPartitionKey()))
                         .addAttribute(String.class, a -> a.name("attribute")
                                                           .getter(Record::getAttribute)
                                                           .setter(Record::attribute))
                         .addAttribute(String.class, a -> a.name("lastUpdatedUuid")
                                                           .getter(Record::getLastUpdatedUuid)
                                                           .setter(Record::lastUpdatedUuid)
                                                           .tags(autoGeneratedUuidAttribute()))
                         .addAttribute(String.class, a -> a.name("createdUuid")
                                                           .getter(Record::getCreatedUuid)
                                                           .setter(Record::createdUuid)
                                                           .tags(autoGeneratedUuidAttribute(),
                                                                 updateBehavior(UpdateBehavior.WRITE_IF_NOT_EXISTS)))
                         .flatten(FLATTENED_TABLE_SCHEMA, Record::getFlattenedRecord, Record::flattenedRecord)
                         .build();


    private final List<Map<String, AttributeValue>> fakeItems =
        IntStream.range(0, 4)
                 .mapToObj($ -> createUniqueFakeItem())
                 .map(fakeItem -> TABLE_SCHEMA.itemToMap(fakeItem, true))
                 .collect(toList());
    private DynamoDbTable<Record> mappedTable;

    private final String concreteTableName;

    @Rule
    public ExpectedException thrown = ExpectedException.none();
    {
        concreteTableName = getConcreteTableName("table-name");
    }

    private String testCaseName;


    @Parameters(name = "{index};  {0}")
    public static Collection<Object[]> data() {
        return Arrays.asList(new Object[][] {
            {
                "StaticTableSchema Schema assigned", TABLE_SCHEMA
            },
            {
                "Bean Schema assigned",
               TableSchema.fromBean(Record.class)
            }
        });
    }

    @Before
    public void createTable() {
        mappedTable.createTable(r -> r.provisionedThroughput(getDefaultProvisionedThroughput()));
    }

    @After
    public void deleteTable() {
        getDynamoDbClient().deleteTable(DeleteTableRequest.builder()
                                                          .tableName(getConcreteTableName("table-name"))
                                                          .build());
    }


    @Test
    public void putNewRecordSetsInitialAutoGeneratedUuid() {
        Record item = new Record().id("id").attribute("one");
        mappedTable.putItem(r -> r.item(item));
        Record result = mappedTable.getItem(r -> r.key(k -> k.partitionValue("id")));
        // All UUID generated are unique
        Assertions.assertThat(result.getCreatedUuid()).isNotEqualTo(result.lastUpdatedUuid);
        Assertions.assertThat(result.getLastUpdatedUuid()).isNotEqualTo(result.flattenedRecord.getGenerated());

        // Al uuid generated match the UUID pattern
        assertRecordHasValidUuid(result);

    }

    private static void assertRecordHasValidUuid(Record result) {
        assertValidUuid(result.getCreatedUuid());
        assertValidUuid(result.getLastUpdatedUuid());
        assertValidUuid(result.getFlattenedRecord().getGenerated());
    }

    @Test
    public void putItemFollowedByUpdates() {

        mappedTable.putItem(r -> r.item(new Record().id("id").attribute("newItem")));
        Record result = mappedTable.getItem(r -> r.key(k -> k.partitionValue("id")));

        String createdUuidAfterPut = result.getCreatedUuid();
        String lastUpdatedUuiAfterPut = result.getLastUpdatedUuid();
        String flattenedRecordAfterPut = result.getFlattenedRecord().getGenerated();

        assertRecordHasValidUuid(result);

        // All UUID generated are unique
        Assertions.assertThat(result.getCreatedUuid()).isNotEqualTo(result.lastUpdatedUuid);
        Assertions.assertThat(result.getLastUpdatedUuid()).isNotEqualTo(result.flattenedRecord.getGenerated());

        // UPDATE
        mappedTable.updateItem(r -> r.item(new Record().id("id").attribute("UpdatedItem")));

        Record afterUpdate = mappedTable.getItem(r -> r.key(k -> k.partitionValue("id")));
        assertRecordHasValidUuid(afterUpdate);

        // All UUID generated are unique
        Assertions.assertThat(afterUpdate.getCreatedUuid()).isNotEqualTo(afterUpdate.lastUpdatedUuid);
        Assertions.assertThat(afterUpdate.getLastUpdatedUuid()).isNotEqualTo(afterUpdate.flattenedRecord.getGenerated());

        // UpdateBehavior.WRITE_IF_NOT_EXISTS , the old UUID is not changed
        Assertions.assertThat(afterUpdate.getCreatedUuid()).isEqualTo(createdUuidAfterPut);

        // UpdateBehavior.WRITE_ALWAYS
        Assertions.assertThat(afterUpdate.getLastUpdatedUuid()).isNotEqualTo(lastUpdatedUuiAfterPut);
        Assertions.assertThat(afterUpdate.getFlattenedRecord().getGenerated()).isNotEqualTo(flattenedRecordAfterPut);
        Assertions.assertThat(afterUpdate.getAttribute()).isEqualTo("UpdatedItem");
        Assertions.assertThat(afterUpdate.getId()).isEqualTo("id");
    }

    @Test
    public void putExistingRecordWithConditionExpressions() {
        mappedTable.putItem(r -> r.item(new Record().id("newId").attribute("one")));
        Record result = mappedTable.getItem(r -> r.key(k -> k.partitionValue("newId")));
        assertRecordHasValidUuid(result);
        String createdUuidAfterPut = result.getCreatedUuid();
        String lastUpdatedUuiAfterPut = result.getLastUpdatedUuid();
        String flattenedRecordAfterPut = result.getFlattenedRecord().getGenerated();
        Expression conditionExpression = Expression.builder()
                                                   .expression("#k = :v OR #k = :v1")
                                                   .putExpressionName("#k", "attribute")
                                                   .putExpressionValue(":v", stringValue("one"))
                                                   .putExpressionValue(":v1", stringValue("wrong2"))
                                                   .build();

        mappedTable.putItem(PutItemEnhancedRequest.builder(Record.class)
                                                  .item(new Record().id("newId").attribute("conditionalUpdate"))
                                                  .conditionExpression(conditionExpression)
                                                  .build());

        Record afterUpdate = mappedTable.getItem(r -> r.key(k -> k.partitionValue("newId")));

        // UpdateBehavior.WRITE_IF_NOT_EXISTS , this gets changed because this is a put
        Assertions.assertThat(afterUpdate.getCreatedUuid()).isNotEqualTo(createdUuidAfterPut);

        // UpdateBehavior.WRITE_ALWAYS
        Assertions.assertThat(afterUpdate.getLastUpdatedUuid()).isNotEqualTo(lastUpdatedUuiAfterPut);
        Assertions.assertThat(afterUpdate.getFlattenedRecord().getGenerated()).isNotEqualTo(flattenedRecordAfterPut);
        Assertions.assertThat(afterUpdate.getAttribute()).isEqualTo("conditionalUpdate");
        Assertions.assertThat(afterUpdate.getId()).isEqualTo("newId");
    }

    @Test
    public void updateExistingRecordWithConditionExpressions() {
        mappedTable.putItem(r -> r.item(new Record().id("id").attribute("one")));
        Record result = mappedTable.getItem(r -> r.key(k -> k.partitionValue("id")));
        assertRecordHasValidUuid(result);
        String createdUuidAfterPut = result.getCreatedUuid();
        String lastUpdatedUuiAfterPut = result.getLastUpdatedUuid();
        String flattenedRecordAfterPut = result.getFlattenedRecord().getGenerated();
        Expression conditionExpression = Expression.builder()
                                                   .expression("#k = :v OR #k = :v1")
                                                   .putExpressionName("#k", "attribute")
                                                   .putExpressionValue(":v", stringValue("one"))
                                                   .putExpressionValue(":v1", stringValue("wrong2"))
                                                   .build();

        mappedTable.updateItem(r -> r.item(new Record().id("id").attribute("conditionalUpdate"))
                                     .conditionExpression(conditionExpression));
        Record afterUpdate = mappedTable.getItem(r -> r.key(k -> k.partitionValue("id")));
        // UpdateBehavior.WRITE_IF_NOT_EXISTS , this gets changed because this is a put
        Assertions.assertThat(afterUpdate.getCreatedUuid()).isEqualTo(createdUuidAfterPut);
        // UpdateBehavior.WRITE_ALWAYS
        Assertions.assertThat(afterUpdate.getLastUpdatedUuid()).isNotEqualTo(lastUpdatedUuiAfterPut);
        Assertions.assertThat(afterUpdate.getFlattenedRecord().getGenerated()).isNotEqualTo(flattenedRecordAfterPut);
        Assertions.assertThat(afterUpdate.getAttribute()).isEqualTo("conditionalUpdate");
        Assertions.assertThat(afterUpdate.getId()).isEqualTo("id");
    }

    @Test
    public void putItemConditionTestFailure() {
        mappedTable.putItem(r -> r.item(new Record().id("id").attribute("one")));
        Expression conditionExpression = Expression.builder()
                                                   .expression("#k = :v OR #k = :v1")
                                                   .putExpressionName("#k", "attribute")
                                                   .putExpressionValue(":v", stringValue("wrong1"))
                                                   .putExpressionValue(":v1", stringValue("wrong2"))
                                                   .build();
        Assertions.assertThatExceptionOfType(ConditionalCheckFailedException.class)
                  .isThrownBy(() -> mappedTable.putItem(PutItemEnhancedRequest.builder(Record.class)
                                                                              .item(new Record().id("id").attribute("one"))
                                                                              .conditionExpression(conditionExpression)
                                                                              .build()))
                  .withMessageContaining("The conditional request failed");
    }

    @Test
    public void updateItemConditionTestFailure() {
        Record updated = mappedTable.updateItem(r -> r.item(new Record().id("id").attribute("one")));
        assertRecordHasValidUuid(updated);
        Expression conditionExpression = Expression.builder()
                                                   .expression("#k = :v OR #k = :v1")
                                                   .putExpressionName("#k", "attribute")
                                                   .putExpressionValue(":v", stringValue("wrong1"))
                                                   .putExpressionValue(":v1", stringValue("wrong2"))
                                                   .build();
        Assertions.assertThatExceptionOfType(ConditionalCheckFailedException.class)
                  .isThrownBy(() -> mappedTable.updateItem(r -> r.item(new Record().id("id").attribute("conditionalUpdate"))
                                                                 .conditionExpression(conditionExpression)))
                  .withMessageContaining("The conditional request failed");

    }

    public static Record createUniqueFakeItem() {
        Record record = new Record();
        record.setId(UUID.randomUUID().toString());
        return record;
    }

    @DynamoDbBean
    public static class Record {
        public Record() {
        }
        private String id;
        private String attribute;
        private String createdUuid;
        private String lastUpdatedUuid;


        private FlattenedRecord flattenedRecord;

        @DynamoDbPartitionKey
        public String getId() {
            return this.id;
        }
        public void setId(String id) {
            this.id = id;
        }

        public String getAttribute() {
            return attribute;
        }

        public void setAttribute(String attribute) {
            this.attribute = attribute;
        }

        public Record attribute(String attribute) {
            this.attribute = attribute;
            return this;
        }

        @DynamoDbAutoGeneratedUuid
        public String getLastUpdatedUuid() {
            return lastUpdatedUuid;
        }

        public void setLastUpdatedUuid(String lastUpdatedUuid) {
            this.lastUpdatedUuid = lastUpdatedUuid;
        }

        public Record lastUpdatedUuid(String lastUpdatedUuid) {
            this.lastUpdatedUuid = lastUpdatedUuid;
            return this;
        }

        @DynamoDbAutoGeneratedUuid
        @DynamoDbUpdateBehavior(value = UpdateBehavior.WRITE_IF_NOT_EXISTS)
        public String getCreatedUuid() {
            return createdUuid;
        }

        public void setCreatedUuid(String createdUuid) {
            this.createdUuid = createdUuid;
        }

        public Record createdUuid(String createdUuid) {
            this.createdUuid = createdUuid;
            return this;
        }

        @DynamoDbFlatten
        public FlattenedRecord getFlattenedRecord() {
            return flattenedRecord;
        }

        public void setFlattenedRecord(FlattenedRecord flattenedRecord) {
            this.flattenedRecord = flattenedRecord;
        }

        public Record flattenedRecord(FlattenedRecord flattenedRecord) {
            this.flattenedRecord = flattenedRecord;
            return this;
        }

        @Override
        public boolean equals(Object o) {
            if (this == o) {
                return true;
            }
            if (o == null || getClass() != o.getClass()) {
                return false;
            }
            Record record = (Record) o;
            return Objects.equals(id, record.id) &&
                   Objects.equals(attribute, record.attribute) &&
                   Objects.equals(lastUpdatedUuid, record.lastUpdatedUuid) &&
                   Objects.equals(createdUuid, record.createdUuid) &&
                   Objects.equals(flattenedRecord, record.flattenedRecord);
        }

        @Override
        public int hashCode() {
            return Objects.hash(id, attribute, lastUpdatedUuid, createdUuid, flattenedRecord);
        }

        @Override
        public String toString() {
            return "Record{" +
                   "id='" + id + '\'' +
                   ", attribute='" + attribute + '\'' +
                   ", createdUuid=" + createdUuid +
                   ", lastUpdatedUuid=" + lastUpdatedUuid +
                   ", flattenedRecord=" + flattenedRecord +
                   '}';
        }

        public Record id(String id) {
            this.id = id;
            return this;
        }
    }

    @DynamoDbBean
    public static class FlattenedRecord {
        private String generated;

        @DynamoDbAutoGeneratedUuid
        public String getGenerated() {
            return generated;
        }

        public void setGenerated(String generated) {
            this.generated = generated;
        }

        public FlattenedRecord generated(String generated) {
            this.generated = generated;
            return this;
        }

        @Override
        public boolean equals(Object o) {
            if (this == o) {
                return true;
            }
            if (o == null || getClass() != o.getClass()) {
                return false;
            }
            FlattenedRecord that = (FlattenedRecord) o;
            return Objects.equals(generated, that.generated);
        }

        @Override
        public int hashCode() {
            return Objects.hash(generated);
        }

        @Override
        public String toString() {
            return "FlattenedRecord{" +
                   "generated=" + generated +
                   '}';
        }
    }
}
